package com.codejiwei.flink.practice;

import com.codejiwei.flink.entity.UserBehavior;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author jiwei
 * @description 统计网站总浏览量(PV)
 * @date 2023/5/22 17:48
 */
public class Flink_Project_Practice_PV {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        env.readTextFile("E:\\workspace\\ss-flink\\src\\main\\resources\\data\\UserBehavior.csv")
                .map(line -> {
                    String[] fields = line.split(",");
                    return new UserBehavior(Long.valueOf(fields[0]), Long.valueOf(fields[1]), Integer.valueOf(fields[2]), fields[3], Long.valueOf(fields[4]));
                })
                .filter(be -> "pv".equals(be.getBehavior()))
                .map(be -> Tuple2.of(be.getBehavior(), 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();
        env.execute();

    }
}
